package com.offcn.bigdata.streaming.p2

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * window ---窗口操作
  *     这里面只提供一种窗口——滑动窗口
  *  每隔N时间，去统计过去多长M时间内产生的数据。
  *
  *  实际上，流式数据因为是持续不断，无限的数据集，所以我们是没有办法进行全量统计，但是我们可以吧这个无限的数据集，
  *  划分成若干个小段，也就是将其切分成了无数的有限数据集——范围统计
  *  把这每一个有限的部分称之为一个窗口window。
  *
  *  特别的，
  *     我们把上面的N称之为窗口统计的滑动频率：slide_interval
  *     我们把上面的M称之为窗口统计的时间长度：window_length
  *
  *     同时，这两个时间，slide_interval和window_length都必须是batch_interval的整数倍。
  *
  *     如果这个窗口长度比较长，是不是就要在内存中来存放这个window的数据，所以如果内存压力比较大，而window_length比较长
  *     容易出现OOM异常。
  *
  *  一种比较特殊的窗口，就是slide_interval=window_length，把这种窗口，我们称之为滚动的窗口
  */
object _05WindowOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName("_02Streaming2KafkaOps")
            .setMaster("local[*]")
            .set("spark.streaming.stopGracefullyOnShutdown", "true")//开启优雅关闭streamingContext
            .set("spark.streaming.receiver.maxRate", "10")//最大读取速率
            .set("spark.streaming.backpressure.enabled", "true")//背压机制
        val batchInterval = 2L
        val ssc = new StreamingContext(conf, Seconds(batchInterval))

        val lines = ssc.socketTextStream("bigdata01", 9999)

        val pairs = lines.flatMap(_.split("\\s+")).map((_, 1))
        //每隔2个批次，统计过去3个批次的数据
        val ret = pairs.reduceByKeyAndWindow(_+_,
            windowDuration = Seconds(batchInterval * 3),
            slideDuration = Seconds(batchInterval *2)
        )

        ret.print()

        ssc.start()
        ssc.awaitTermination()
    }
}
